Initial implementation of Pipeline DLQ#5277
Conversation
| output(records, null); | ||
| } | ||
|
|
||
| void output(Collection<T> records, PipelineIf failurePipeline); |
There was a problem hiding this comment.
Should we have a setFailurePipeline method instead? I suspect the sinks are going to need to hang on this to in order to use it later in the code.
There was a problem hiding this comment.
Also, rather than passing a PipelineIf, I'd probably pass this to the sinks.
interface FailurePipeline {
void writeAll(Collection<T> records);
}
All the sinks care about is writing to the pipeline. I don't think they care about the Source object itself. Also, the Source interface doesn't support writing.
I'd even think that the implementation of FailurePipeline would write directly to the Buffer.
There was a problem hiding this comment.
@dlvenable Sinks that want to hang on can set that themselves, right? Why introduce a new API when the existing API can serve the purpose?
|
|
||
| import org.opensearch.dataprepper.model.source.Source; | ||
|
|
||
| public interface PipelineIf { |
There was a problem hiding this comment.
@dlvenable I guess I meant it to be "Pipeline Interface" as a pointer to pipeline from Source. I am open to suggestions on the naming
There was a problem hiding this comment.
I needed an interface in data-prepper-api directory so that I can use it in other interfaces in the directory
| public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer { | ||
| static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); | ||
|
|
||
| static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; |
There was a problem hiding this comment.
I think we should give this a different name.
default_failure_pipeline
There was a problem hiding this comment.
@dlvenable Raj prefers the well known name like dlq. My idea was to provide a way to change this name in a future PR.
There was a problem hiding this comment.
I think the name dlq already implies something since our DLQs are different from any other concept. So I think a failure pipeline is clearer. Maybe default_dlq_pipeline instead?
There was a problem hiding this comment.
or maybe just dlq_pipeline?
| } | ||
|
|
||
| @Override | ||
| public void sendFailedEvents(Collection<Record<Event>> records) { |
| * @return FailurePipeline returns failure pipeline | ||
| * @since 2.12 | ||
| */ | ||
| default FailurePipeline getFailurePipeline() { |
There was a problem hiding this comment.
I don't think we need these in the interface. Each Buffer can handle setFailurePipeline as needed.
| * @return FailurePipeline returns failure pipeline | ||
| * @since 2.12 | ||
| */ | ||
| default FailurePipeline getFailurePipeline() { |
There was a problem hiding this comment.
You don't need the getter on the interface.
| * @return FailurePipeline returns failure pipeline | ||
| * @since 2.12 | ||
| */ | ||
| default FailurePipeline getFailurePipeline() { |
There was a problem hiding this comment.
You don't need this on the interface.
| * @return FailurePipeline returns failure pipeline | ||
| * @since 2.12 | ||
| */ | ||
| default FailurePipeline getFailurePipeline() { |
There was a problem hiding this comment.
You don't need this on the interface.
| public class DataPrepperConfiguration implements ExtensionsConfiguration, EventConfigurationContainer { | ||
| static final Duration DEFAULT_SHUTDOWN_DURATION = Duration.ofSeconds(30L); | ||
|
|
||
| static final String DEFAULT_FAILURE_PIPELINE_NAME = "dlq"; |
There was a problem hiding this comment.
I think the name dlq already implies something since our DLQs are different from any other concept. So I think a failure pipeline is clearer. Maybe default_dlq_pipeline instead?
| try { | ||
| buffer.writeAll(records, DEFAULT_WRITE_TIMEOUT); | ||
| } catch (Exception e) { | ||
| LOG.error("Failed to write to failure pipeline"); |
There was a problem hiding this comment.
Will we hit this if failure pipeline buffer ever gets full?
There was a problem hiding this comment.
yes, I guess I could add some retries here. But overall, we can't wait here forever.
| @@ -107,7 +107,9 @@ Collection runProcessorsAndProcessAcknowledgements(List<Processor> processors, C | |||
| } | |||
| } catch (final Exception e) { | |||
| LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); | |||
There was a problem hiding this comment.
Maybe we change this to log that it's going to failure pipeline if it's enabled and only log the dropped message when failure pipeline doesn't exist?
|
I am also thinking that I should rename "FailurePipeline" to "NoSourcePipeline". We may be using this pipeline in other cases as well. @dlvenable what do you think? |
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
Signed-off-by: Kondaka <krishkdk@amazon.com>
| @JsonProperty("sink") final List<SinkModel> sinks, | ||
| @JsonProperty("workers") final Integer workers, | ||
| @JsonProperty("delay") final Integer delay) { | ||
| checkArgument(Objects.nonNull(source), "Source must not be null"); |
There was a problem hiding this comment.
Why is source not required anymore? Doesn't even DLQ pipeline have a pipeline source?
There was a problem hiding this comment.
In the PIpelineModel, it does not.
@kkondaka , Do we have validations elsewhere on this?
There was a problem hiding this comment.
@graytaylor0 DLQ pipelines do not have source because source/processor/buffer/sink can end events to DLQ pipeline. So, the source is the new "HeadlessPipelineSource" that I added. This source is created automatically for a DLQ pipeline. It is not configurable.
There was a problem hiding this comment.
@dlvenable. Yes, there are validations that fail if a source is not specified.
| for (Map.Entry<String, Pipeline> pipelineEntry : pipelineMap.entrySet()) { | ||
| if (!(pipelineEntry.getKey().equals(failurePipelineName))) { | ||
| pipelineEntry.getValue().setFailurePipeline(failurePipeline); | ||
| acknowledgementsEnabled = acknowledgementsEnabled || pipelineEntry.getValue().areAcknowledgementsEnabled(); |
There was a problem hiding this comment.
Why not just do
acknowledgementsEnabled = pipelineEntry.getValue().areAcknowledgementsEnabled();
There was a problem hiding this comment.
If there are 10 sub pipelines, and first 9 has ack enabled and the 10th doesn't then the final result would be false! (In fact, this shouldn't happen but just want to be sure) and we want to "release events" when acks are enabled.
| numberOfEventsSuccessful.increment(records.size()); | ||
| break; | ||
| } catch (Exception e) { | ||
| LOG.error(NOISY, "Failed to write to failure pipeline"); |
There was a problem hiding this comment.
We should log the exception message here.
| LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); | ||
| if (inputEvents != null) { | ||
| if (pipeline.getFailurePipeline() != null) { | ||
| pipeline.getFailurePipeline().sendEvents(records); |
There was a problem hiding this comment.
We may still want a log here
LOG.error("A processor threw an exception. This batch of Events will be sent to the pipeline DLQ, and their EventHandles will be released: ", e);
There was a problem hiding this comment.
@graytaylor0 LOG.error() automatically logs the exception, right?
| } | ||
|
|
||
| @Test | ||
| //@Timeout(value = 2000, unit = TimeUnit.MILLISECONDS) |
| Collection<Record<Event>> records = mock(Collection.class); | ||
| failurePipeline.sendEvents(records); | ||
| verify(headlessPipelineSource).sendEvents(records); | ||
| //assertThat(testPipeline.areAcknowledgementsEnabled(), equalTo(false)); |
| processorSets.forEach(processorSet -> processorSet.forEach(processor -> { | ||
| assertThat(((TestProcessor)processor).getFailurePipeline(), equalTo(failurePipeline)); | ||
| })); | ||
| for (Sink sink: sinks) { |
There was a problem hiding this comment.
Assert the size of this collection so that we know this loop runs.
I like |
|
@kkondaka , The builds are all failing. Please take a look. |
Signed-off-by: Kondaka <krishkdk@amazon.com>
dlvenable
left a comment
There was a problem hiding this comment.
Thanks! This will be a significant improvement to how Data Prepper handles errors!
Description
[Describe what this change achieves]
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.